tokio\sync\mpsc/
bounded.rs

1use crate::loom::sync::Arc;
2use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
3use crate::sync::mpsc::chan;
4use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
5
6cfg_time! {
7    use crate::sync::mpsc::error::SendTimeoutError;
8    use crate::time::Duration;
9}
10
11use std::fmt;
12use std::task::{Context, Poll};
13
14/// Sends values to the associated `Receiver`.
15///
16/// Instances are created by the [`channel`] function.
17///
18/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
19/// use the [`PollSender`] utility.
20///
21/// [`PollSender`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSender.html
22pub struct Sender<T> {
23    chan: chan::Tx<T, Semaphore>,
24}
25
26/// A sender that does not prevent the channel from being closed.
27///
28/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
29/// instances remain, the channel is closed.
30///
31/// In order to send messages, the `WeakSender` needs to be upgraded using
32/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
33/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
34///
35/// [`Sender`]: Sender
36/// [`WeakSender::upgrade`]: WeakSender::upgrade
37///
38/// # Examples
39///
40/// ```
41/// use tokio::sync::mpsc::channel;
42///
43/// # #[tokio::main(flavor = "current_thread")]
44/// # async fn main() {
45/// let (tx, _rx) = channel::<i32>(15);
46/// let tx_weak = tx.downgrade();
47///
48/// // Upgrading will succeed because `tx` still exists.
49/// assert!(tx_weak.upgrade().is_some());
50///
51/// // If we drop `tx`, then it will fail.
52/// drop(tx);
53/// assert!(tx_weak.clone().upgrade().is_none());
54/// # }
55/// ```
56pub struct WeakSender<T> {
57    chan: Arc<chan::Chan<T, Semaphore>>,
58}
59
60/// Permits to send one value into the channel.
61///
62/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
63/// and are used to guarantee channel capacity before generating a message to send.
64///
65/// [`Sender::reserve()`]: Sender::reserve
66/// [`Sender::try_reserve()`]: Sender::try_reserve
67pub struct Permit<'a, T> {
68    chan: &'a chan::Tx<T, Semaphore>,
69}
70
71/// An [`Iterator`] of [`Permit`] that can be used to hold `n` slots in the channel.
72///
73/// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`]
74/// and are used to guarantee channel capacity before generating `n` messages to send.
75///
76/// [`Sender::reserve_many()`]: Sender::reserve_many
77/// [`Sender::try_reserve_many()`]: Sender::try_reserve_many
78pub struct PermitIterator<'a, T> {
79    chan: &'a chan::Tx<T, Semaphore>,
80    n: usize,
81}
82
83/// Owned permit to send one value into the channel.
84///
85/// This is identical to the [`Permit`] type, except that it moves the sender
86/// rather than borrowing it.
87///
88/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
89/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
90/// before generating a message to send.
91///
92/// [`Permit`]: Permit
93/// [`Sender::reserve_owned()`]: Sender::reserve_owned
94/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
95pub struct OwnedPermit<T> {
96    chan: Option<chan::Tx<T, Semaphore>>,
97}
98
99/// Receives values from the associated `Sender`.
100///
101/// Instances are created by the [`channel`] function.
102///
103/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
104///
105/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
106pub struct Receiver<T> {
107    /// The channel receiver.
108    chan: chan::Rx<T, Semaphore>,
109}
110
111/// Creates a bounded mpsc channel for communicating between asynchronous tasks
112/// with backpressure.
113///
114/// The channel will buffer up to the provided number of messages.  Once the
115/// buffer is full, attempts to send new messages will wait until a message is
116/// received from the channel. The provided buffer capacity must be at least 1.
117///
118/// All data sent on `Sender` will become available on `Receiver` in the same
119/// order as it was sent.
120///
121/// The `Sender` can be cloned to `send` to the same channel from multiple code
122/// locations. Only one `Receiver` is supported.
123///
124/// If the `Receiver` is disconnected while trying to `send`, the `send` method
125/// will return a `SendError`. Similarly, if `Sender` is disconnected while
126/// trying to `recv`, the `recv` method will return `None`.
127///
128/// # Panics
129///
130/// Panics if the buffer capacity is 0, or too large. Currently the maximum
131/// capacity is [`Semaphore::MAX_PERMITS`].
132///
133/// [`Semaphore::MAX_PERMITS`]: crate::sync::Semaphore::MAX_PERMITS
134///
135/// # Examples
136///
137/// ```rust
138/// use tokio::sync::mpsc;
139///
140/// # #[tokio::main(flavor = "current_thread")]
141/// # async fn main() {
142/// let (tx, mut rx) = mpsc::channel(100);
143///
144/// tokio::spawn(async move {
145///     for i in 0..10 {
146///         if let Err(_) = tx.send(i).await {
147///             println!("receiver dropped");
148///             return;
149///         }
150///     }
151/// });
152///
153/// while let Some(i) = rx.recv().await {
154///      println!("got = {}", i);
155/// }
156/// # }
157/// ```
158#[track_caller]
159pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
160    assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
161    let semaphore = Semaphore {
162        semaphore: semaphore::Semaphore::new(buffer),
163        bound: buffer,
164    };
165    let (tx, rx) = chan::channel(semaphore);
166
167    let tx = Sender::new(tx);
168    let rx = Receiver::new(rx);
169
170    (tx, rx)
171}
172
173/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
174/// representing the channel bound.
175#[derive(Debug)]
176pub(crate) struct Semaphore {
177    pub(crate) semaphore: semaphore::Semaphore,
178    pub(crate) bound: usize,
179}
180
181impl<T> Receiver<T> {
182    pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
183        Receiver { chan }
184    }
185
186    /// Receives the next value for this receiver.
187    ///
188    /// This method returns `None` if the channel has been closed and there are
189    /// no remaining messages in the channel's buffer. This indicates that no
190    /// further values can ever be received from this `Receiver`. The channel is
191    /// closed when all senders have been dropped, or when [`close`] is called.
192    ///
193    /// If there are no messages in the channel's buffer, but the channel has
194    /// not yet been closed, this method will sleep until a message is sent or
195    /// the channel is closed.  Note that if [`close`] is called, but there are
196    /// still outstanding [`Permits`] from before it was closed, the channel is
197    /// not considered closed by `recv` until the permits are released.
198    ///
199    /// # Cancel safety
200    ///
201    /// This method is cancel safe. If `recv` is used as the event in a
202    /// [`tokio::select!`](crate::select) statement and some other branch
203    /// completes first, it is guaranteed that no messages were received on this
204    /// channel.
205    ///
206    /// [`close`]: Self::close
207    /// [`Permits`]: struct@crate::sync::mpsc::Permit
208    ///
209    /// # Examples
210    ///
211    /// ```
212    /// use tokio::sync::mpsc;
213    ///
214    /// # #[tokio::main(flavor = "current_thread")]
215    /// # async fn main() {
216    /// let (tx, mut rx) = mpsc::channel(100);
217    ///
218    /// tokio::spawn(async move {
219    ///     tx.send("hello").await.unwrap();
220    /// });
221    ///
222    /// assert_eq!(Some("hello"), rx.recv().await);
223    /// assert_eq!(None, rx.recv().await);
224    /// # }
225    /// ```
226    ///
227    /// Values are buffered:
228    ///
229    /// ```
230    /// use tokio::sync::mpsc;
231    ///
232    /// # #[tokio::main(flavor = "current_thread")]
233    /// # async fn main() {
234    /// let (tx, mut rx) = mpsc::channel(100);
235    ///
236    /// tx.send("hello").await.unwrap();
237    /// tx.send("world").await.unwrap();
238    ///
239    /// assert_eq!(Some("hello"), rx.recv().await);
240    /// assert_eq!(Some("world"), rx.recv().await);
241    /// # }
242    /// ```
243    pub async fn recv(&mut self) -> Option<T> {
244        use std::future::poll_fn;
245        poll_fn(|cx| self.chan.recv(cx)).await
246    }
247
248    /// Receives the next values for this receiver and extends `buffer`.
249    ///
250    /// This method extends `buffer` by no more than a fixed number of values
251    /// as specified by `limit`. If `limit` is zero, the function immediately
252    /// returns `0`. The return value is the number of values added to `buffer`.
253    ///
254    /// For `limit > 0`, if there are no messages in the channel's queue, but
255    /// the channel has not yet been closed, this method will sleep until a
256    /// message is sent or the channel is closed. Note that if [`close`] is
257    /// called, but there are still outstanding [`Permits`] from before it was
258    /// closed, the channel is not considered closed by `recv_many` until the
259    /// permits are released.
260    ///
261    /// For non-zero values of `limit`, this method will never return `0` unless
262    /// the channel has been closed and there are no remaining messages in the
263    /// channel's queue. This indicates that no further values can ever be
264    /// received from this `Receiver`. The channel is closed when all senders
265    /// have been dropped, or when [`close`] is called.
266    ///
267    /// The capacity of `buffer` is increased as needed.
268    ///
269    /// # Cancel safety
270    ///
271    /// This method is cancel safe. If `recv_many` is used as the event in a
272    /// [`tokio::select!`](crate::select) statement and some other branch
273    /// completes first, it is guaranteed that no messages were received on this
274    /// channel.
275    ///
276    /// [`close`]: Self::close
277    /// [`Permits`]: struct@crate::sync::mpsc::Permit
278    ///
279    /// # Examples
280    ///
281    /// ```
282    /// use tokio::sync::mpsc;
283    ///
284    /// # #[tokio::main(flavor = "current_thread")]
285    /// # async fn main() {
286    /// let mut buffer: Vec<&str> = Vec::with_capacity(2);
287    /// let limit = 2;
288    /// let (tx, mut rx) = mpsc::channel(100);
289    /// let tx2 = tx.clone();
290    /// tx2.send("first").await.unwrap();
291    /// tx2.send("second").await.unwrap();
292    /// tx2.send("third").await.unwrap();
293    ///
294    /// // Call `recv_many` to receive up to `limit` (2) values.
295    /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
296    /// assert_eq!(vec!["first", "second"], buffer);
297    ///
298    /// // If the buffer is full, the next call to `recv_many`
299    /// // reserves additional capacity.
300    /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
301    ///
302    /// tokio::spawn(async move {
303    ///     tx.send("fourth").await.unwrap();
304    /// });
305    ///
306    /// // 'tx' is dropped, but `recv_many`
307    /// // is guaranteed not to return 0 as the channel
308    /// // is not yet closed.
309    /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
310    /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
311    ///
312    /// // Once the last sender is dropped, the channel is
313    /// // closed and `recv_many` returns 0, capacity unchanged.
314    /// drop(tx2);
315    /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
316    /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
317    /// # }
318    /// ```
319    pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
320        use std::future::poll_fn;
321        poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
322    }
323
324    /// Tries to receive the next value for this receiver.
325    ///
326    /// This method returns the [`Empty`] error if the channel is currently
327    /// empty, but there are still outstanding [senders] or [permits].
328    ///
329    /// This method returns the [`Disconnected`] error if the channel is
330    /// currently empty, and there are no outstanding [senders] or [permits].
331    ///
332    /// Unlike the [`poll_recv`] method, this method will never return an
333    /// [`Empty`] error spuriously.
334    ///
335    /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
336    /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
337    /// [`poll_recv`]: Self::poll_recv
338    /// [senders]: crate::sync::mpsc::Sender
339    /// [permits]: crate::sync::mpsc::Permit
340    ///
341    /// # Examples
342    ///
343    /// ```
344    /// use tokio::sync::mpsc;
345    /// use tokio::sync::mpsc::error::TryRecvError;
346    ///
347    /// # #[tokio::main(flavor = "current_thread")]
348    /// # async fn main() {
349    /// let (tx, mut rx) = mpsc::channel(100);
350    ///
351    /// tx.send("hello").await.unwrap();
352    ///
353    /// assert_eq!(Ok("hello"), rx.try_recv());
354    /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
355    ///
356    /// tx.send("hello").await.unwrap();
357    /// // Drop the last sender, closing the channel.
358    /// drop(tx);
359    ///
360    /// assert_eq!(Ok("hello"), rx.try_recv());
361    /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
362    /// # }
363    /// ```
364    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
365        self.chan.try_recv()
366    }
367
368    /// Blocking receive to call outside of asynchronous contexts.
369    ///
370    /// This method returns `None` if the channel has been closed and there are
371    /// no remaining messages in the channel's buffer. This indicates that no
372    /// further values can ever be received from this `Receiver`. The channel is
373    /// closed when all senders have been dropped, or when [`close`] is called.
374    ///
375    /// If there are no messages in the channel's buffer, but the channel has
376    /// not yet been closed, this method will block until a message is sent or
377    /// the channel is closed.
378    ///
379    /// This method is intended for use cases where you are sending from
380    /// asynchronous code to synchronous code, and will work even if the sender
381    /// is not using [`blocking_send`] to send the message.
382    ///
383    /// Note that if [`close`] is called, but there are still outstanding
384    /// [`Permits`] from before it was closed, the channel is not considered
385    /// closed by `blocking_recv` until the permits are released.
386    ///
387    /// [`close`]: Self::close
388    /// [`Permits`]: struct@crate::sync::mpsc::Permit
389    /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
390    ///
391    /// # Panics
392    ///
393    /// This function panics if called within an asynchronous execution
394    /// context.
395    ///
396    /// # Examples
397    ///
398    /// ```
399    /// # #[cfg(not(target_family = "wasm"))]
400    /// # {
401    /// use std::thread;
402    /// use tokio::runtime::Runtime;
403    /// use tokio::sync::mpsc;
404    ///
405    /// fn main() {
406    ///     let (tx, mut rx) = mpsc::channel::<u8>(10);
407    ///
408    ///     let sync_code = thread::spawn(move || {
409    ///         assert_eq!(Some(10), rx.blocking_recv());
410    ///     });
411    ///
412    ///     Runtime::new()
413    ///         .unwrap()
414    ///         .block_on(async move {
415    ///             let _ = tx.send(10).await;
416    ///         });
417    ///     sync_code.join().unwrap()
418    /// }
419    /// # }
420    /// ```
421    #[track_caller]
422    #[cfg(feature = "sync")]
423    #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
424    pub fn blocking_recv(&mut self) -> Option<T> {
425        crate::future::block_on(self.recv())
426    }
427
428    /// Variant of [`Self::recv_many`] for blocking contexts.
429    ///
430    /// The same conditions as in [`Self::blocking_recv`] apply.
431    #[track_caller]
432    #[cfg(feature = "sync")]
433    #[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
434    pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
435        crate::future::block_on(self.recv_many(buffer, limit))
436    }
437
438    /// Closes the receiving half of a channel without dropping it.
439    ///
440    /// This prevents any further messages from being sent on the channel while
441    /// still enabling the receiver to drain messages that are buffered. Any
442    /// outstanding [`Permit`] values will still be able to send messages.
443    ///
444    /// To guarantee that no messages are dropped, after calling `close()`,
445    /// `recv()` must be called until `None` is returned. If there are
446    /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
447    /// not return `None` until those are released.
448    ///
449    /// [`Permit`]: Permit
450    /// [`OwnedPermit`]: OwnedPermit
451    ///
452    /// # Examples
453    ///
454    /// ```
455    /// use tokio::sync::mpsc;
456    ///
457    /// # #[tokio::main(flavor = "current_thread")]
458    /// # async fn main() {
459    /// let (tx, mut rx) = mpsc::channel(20);
460    ///
461    /// tokio::spawn(async move {
462    ///     let mut i = 0;
463    ///     while let Ok(permit) = tx.reserve().await {
464    ///         permit.send(i);
465    ///         i += 1;
466    ///     }
467    /// });
468    ///
469    /// rx.close();
470    ///
471    /// while let Some(msg) = rx.recv().await {
472    ///     println!("got {}", msg);
473    /// }
474    ///
475    /// // Channel closed and no messages are lost.
476    /// # }
477    /// ```
478    pub fn close(&mut self) {
479        self.chan.close();
480    }
481
482    /// Checks if a channel is closed.
483    ///
484    /// This method returns `true` if the channel has been closed. The channel is closed
485    /// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called.
486    ///
487    /// [`Sender`]: crate::sync::mpsc::Sender
488    /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
489    ///
490    /// # Examples
491    /// ```
492    /// use tokio::sync::mpsc;
493    ///
494    /// # #[tokio::main(flavor = "current_thread")]
495    /// # async fn main() {
496    /// let (_tx, mut rx) = mpsc::channel::<()>(10);
497    /// assert!(!rx.is_closed());
498    ///
499    /// rx.close();
500    ///
501    /// assert!(rx.is_closed());
502    /// # }
503    /// ```
504    pub fn is_closed(&self) -> bool {
505        self.chan.is_closed()
506    }
507
508    /// Checks if a channel is empty.
509    ///
510    /// This method returns `true` if the channel has no messages.
511    ///
512    /// # Examples
513    /// ```
514    /// use tokio::sync::mpsc;
515    ///
516    /// # #[tokio::main(flavor = "current_thread")]
517    /// # async fn main() {
518    /// let (tx, rx) = mpsc::channel(10);
519    /// assert!(rx.is_empty());
520    ///
521    /// tx.send(0).await.unwrap();
522    /// assert!(!rx.is_empty());
523    /// # }
524    ///
525    /// ```
526    pub fn is_empty(&self) -> bool {
527        self.chan.is_empty()
528    }
529
530    /// Returns the number of messages in the channel.
531    ///
532    /// # Examples
533    /// ```
534    /// use tokio::sync::mpsc;
535    ///
536    /// # #[tokio::main(flavor = "current_thread")]
537    /// # async fn main() {
538    /// let (tx, rx) = mpsc::channel(10);
539    /// assert_eq!(0, rx.len());
540    ///
541    /// tx.send(0).await.unwrap();
542    /// assert_eq!(1, rx.len());
543    /// # }
544    /// ```
545    pub fn len(&self) -> usize {
546        self.chan.len()
547    }
548
549    /// Returns the current capacity of the channel.
550    ///
551    /// The capacity goes down when the sender sends a value by calling [`Sender::send`] or by reserving
552    /// capacity with [`Sender::reserve`]. The capacity goes up when values are received.
553    /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
554    /// specified when calling [`channel`].
555    ///
556    /// # Examples
557    ///
558    /// ```
559    /// use tokio::sync::mpsc;
560    ///
561    /// # #[tokio::main(flavor = "current_thread")]
562    /// # async fn main() {
563    /// let (tx, mut rx) = mpsc::channel::<()>(5);
564    ///
565    /// assert_eq!(rx.capacity(), 5);
566    ///
567    /// // Making a reservation drops the capacity by one.
568    /// let permit = tx.reserve().await.unwrap();
569    /// assert_eq!(rx.capacity(), 4);
570    /// assert_eq!(rx.len(), 0);
571    ///
572    /// // Sending and receiving a value increases the capacity by one.
573    /// permit.send(());
574    /// assert_eq!(rx.len(), 1);
575    /// rx.recv().await.unwrap();
576    /// assert_eq!(rx.capacity(), 5);
577    ///
578    /// // Directly sending a message drops the capacity by one.
579    /// tx.send(()).await.unwrap();
580    /// assert_eq!(rx.capacity(), 4);
581    /// assert_eq!(rx.len(), 1);
582    ///
583    /// // Receiving the message increases the capacity by one.
584    /// rx.recv().await.unwrap();
585    /// assert_eq!(rx.capacity(), 5);
586    /// assert_eq!(rx.len(), 0);
587    /// # }
588    /// ```
589    /// [`capacity`]: Receiver::capacity
590    /// [`max_capacity`]: Receiver::max_capacity
591    pub fn capacity(&self) -> usize {
592        self.chan.semaphore().semaphore.available_permits()
593    }
594
595    /// Returns the maximum buffer capacity of the channel.
596    ///
597    /// The maximum capacity is the buffer capacity initially specified when calling
598    /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
599    /// available buffer capacity: as messages are sent and received, the value
600    /// returned by [`capacity`] will go up or down, whereas the value
601    /// returned by [`max_capacity`] will remain constant.
602    ///
603    /// # Examples
604    ///
605    /// ```
606    /// use tokio::sync::mpsc;
607    ///
608    /// # #[tokio::main(flavor = "current_thread")]
609    /// # async fn main() {
610    /// let (tx, rx) = mpsc::channel::<()>(5);
611    ///
612    /// // both max capacity and capacity are the same at first
613    /// assert_eq!(rx.max_capacity(), 5);
614    /// assert_eq!(rx.capacity(), 5);
615    ///
616    /// // Making a reservation doesn't change the max capacity.
617    /// let permit = tx.reserve().await.unwrap();
618    /// assert_eq!(rx.max_capacity(), 5);
619    /// // but drops the capacity by one
620    /// assert_eq!(rx.capacity(), 4);
621    /// # }
622    /// ```
623    /// [`capacity`]: Receiver::capacity
624    /// [`max_capacity`]: Receiver::max_capacity
625    pub fn max_capacity(&self) -> usize {
626        self.chan.semaphore().bound
627    }
628
629    /// Polls to receive the next message on this channel.
630    ///
631    /// This method returns:
632    ///
633    ///  * `Poll::Pending` if no messages are available but the channel is not
634    ///    closed, or if a spurious failure happens.
635    ///  * `Poll::Ready(Some(message))` if a message is available.
636    ///  * `Poll::Ready(None)` if the channel has been closed and all messages
637    ///    sent before it was closed have been received.
638    ///
639    /// When the method returns `Poll::Pending`, the `Waker` in the provided
640    /// `Context` is scheduled to receive a wakeup when a message is sent on any
641    /// receiver, or when the channel is closed.  Note that on multiple calls to
642    /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
643    /// passed to the most recent call is scheduled to receive a wakeup.
644    ///
645    /// If this method returns `Poll::Pending` due to a spurious failure, then
646    /// the `Waker` will be notified when the situation causing the spurious
647    /// failure has been resolved. Note that receiving such a wakeup does not
648    /// guarantee that the next call will succeed — it could fail with another
649    /// spurious failure.
650    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
651        self.chan.recv(cx)
652    }
653
654    /// Polls to receive multiple messages on this channel, extending the provided buffer.
655    ///
656    /// This method returns:
657    /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
658    ///   spurious failure happens.
659    /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
660    ///   stored in `buffer`. This can be less than, or equal to, `limit`.
661    /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
662    ///
663    /// When the method returns `Poll::Pending`, the `Waker` in the provided
664    /// `Context` is scheduled to receive a wakeup when a message is sent on any
665    /// receiver, or when the channel is closed.  Note that on multiple calls to
666    /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
667    /// passed to the most recent call is scheduled to receive a wakeup.
668    ///
669    /// Note that this method does not guarantee that exactly `limit` messages
670    /// are received. Rather, if at least one message is available, it returns
671    /// as many messages as it can up to the given limit. This method returns
672    /// zero only if the channel is closed (or if `limit` is zero).
673    ///
674    /// # Examples
675    ///
676    /// ```
677    /// use std::task::{Context, Poll};
678    /// use std::pin::Pin;
679    /// use tokio::sync::mpsc;
680    /// use futures::Future;
681    ///
682    /// struct MyReceiverFuture<'a> {
683    ///     receiver: mpsc::Receiver<i32>,
684    ///     buffer: &'a mut Vec<i32>,
685    ///     limit: usize,
686    /// }
687    ///
688    /// impl<'a> Future for MyReceiverFuture<'a> {
689    ///     type Output = usize; // Number of messages received
690    ///
691    ///     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
692    ///         let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
693    ///
694    ///         // Now `receiver` and `buffer` are mutable references, and `limit` is copied
695    ///         match receiver.poll_recv_many(cx, *buffer, *limit) {
696    ///             Poll::Pending => Poll::Pending,
697    ///             Poll::Ready(count) => Poll::Ready(count),
698    ///         }
699    ///     }
700    /// }
701    ///
702    /// # #[tokio::main(flavor = "current_thread")]
703    /// # async fn main() {
704    /// let (tx, rx) = mpsc::channel(32);
705    /// let mut buffer = Vec::new();
706    ///
707    /// let my_receiver_future = MyReceiverFuture {
708    ///     receiver: rx,
709    ///     buffer: &mut buffer,
710    ///     limit: 3,
711    /// };
712    ///
713    /// for i in 0..10 {
714    ///     tx.send(i).await.unwrap();
715    /// }
716    ///
717    /// let count = my_receiver_future.await;
718    /// assert_eq!(count, 3);
719    /// assert_eq!(buffer, vec![0,1,2])
720    /// # }
721    /// ```
722    pub fn poll_recv_many(
723        &mut self,
724        cx: &mut Context<'_>,
725        buffer: &mut Vec<T>,
726        limit: usize,
727    ) -> Poll<usize> {
728        self.chan.recv_many(cx, buffer, limit)
729    }
730
731    /// Returns the number of [`Sender`] handles.
732    pub fn sender_strong_count(&self) -> usize {
733        self.chan.sender_strong_count()
734    }
735
736    /// Returns the number of [`WeakSender`] handles.
737    pub fn sender_weak_count(&self) -> usize {
738        self.chan.sender_weak_count()
739    }
740}
741
742impl<T> fmt::Debug for Receiver<T> {
743    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
744        fmt.debug_struct("Receiver")
745            .field("chan", &self.chan)
746            .finish()
747    }
748}
749
750impl<T> Unpin for Receiver<T> {}
751
752impl<T> Sender<T> {
753    pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
754        Sender { chan }
755    }
756
757    /// Sends a value, waiting until there is capacity.
758    ///
759    /// A successful send occurs when it is determined that the other end of the
760    /// channel has not hung up already. An unsuccessful send would be one where
761    /// the corresponding receiver has already been closed. Note that a return
762    /// value of `Err` means that the data will never be received, but a return
763    /// value of `Ok` does not mean that the data will be received. It is
764    /// possible for the corresponding receiver to hang up immediately after
765    /// this function returns `Ok`.
766    ///
767    /// # Errors
768    ///
769    /// If the receive half of the channel is closed, either due to [`close`]
770    /// being called or the [`Receiver`] handle dropping, the function returns
771    /// an error. The error includes the value passed to `send`.
772    ///
773    /// [`close`]: Receiver::close
774    /// [`Receiver`]: Receiver
775    ///
776    /// # Cancel safety
777    ///
778    /// If `send` is used as the event in a [`tokio::select!`](crate::select)
779    /// statement and some other branch completes first, then it is guaranteed
780    /// that the message was not sent. **However, in that case, the message
781    /// is dropped and will be lost.**
782    ///
783    /// To avoid losing messages, use [`reserve`](Self::reserve) to reserve
784    /// capacity, then use the returned [`Permit`] to send the message.
785    ///
786    /// This channel uses a queue to ensure that calls to `send` and `reserve`
787    /// complete in the order they were requested.  Cancelling a call to
788    /// `send` makes you lose your place in the queue.
789    ///
790    /// # Examples
791    ///
792    /// In the following example, each call to `send` will block until the
793    /// previously sent value was received.
794    ///
795    /// ```rust
796    /// use tokio::sync::mpsc;
797    ///
798    /// # #[tokio::main(flavor = "current_thread")]
799    /// # async fn main() {
800    /// let (tx, mut rx) = mpsc::channel(1);
801    ///
802    /// tokio::spawn(async move {
803    ///     for i in 0..10 {
804    ///         if let Err(_) = tx.send(i).await {
805    ///             println!("receiver dropped");
806    ///             return;
807    ///         }
808    ///     }
809    /// });
810    ///
811    /// while let Some(i) = rx.recv().await {
812    ///     println!("got = {}", i);
813    /// }
814    /// # }
815    /// ```
816    pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
817        match self.reserve().await {
818            Ok(permit) => {
819                permit.send(value);
820                Ok(())
821            }
822            Err(_) => Err(SendError(value)),
823        }
824    }
825
826    /// Completes when the receiver has dropped.
827    ///
828    /// This allows the producers to get notified when interest in the produced
829    /// values is canceled and immediately stop doing work.
830    ///
831    /// # Cancel safety
832    ///
833    /// This method is cancel safe. Once the channel is closed, it stays closed
834    /// forever and all future calls to `closed` will return immediately.
835    ///
836    /// # Examples
837    ///
838    /// ```
839    /// use tokio::sync::mpsc;
840    ///
841    /// # #[tokio::main(flavor = "current_thread")]
842    /// # async fn main() {
843    /// let (tx1, rx) = mpsc::channel::<()>(1);
844    /// let tx2 = tx1.clone();
845    /// let tx3 = tx1.clone();
846    /// let tx4 = tx1.clone();
847    /// let tx5 = tx1.clone();
848    /// tokio::spawn(async move {
849    ///     drop(rx);
850    /// });
851    ///
852    /// futures::join!(
853    ///     tx1.closed(),
854    ///     tx2.closed(),
855    ///     tx3.closed(),
856    ///     tx4.closed(),
857    ///     tx5.closed()
858    /// );
859    /// println!("Receiver dropped");
860    /// # }
861    /// ```
862    pub async fn closed(&self) {
863        self.chan.closed().await;
864    }
865
866    /// Attempts to immediately send a message on this `Sender`
867    ///
868    /// This method differs from [`send`] by returning immediately if the channel's
869    /// buffer is full or no receiver is waiting to acquire some data. Compared
870    /// with [`send`], this function has two failure cases instead of one (one for
871    /// disconnection, one for a full buffer).
872    ///
873    /// # Errors
874    ///
875    /// If the channel capacity has been reached, i.e., the channel has `n`
876    /// buffered values where `n` is the argument passed to [`channel`], then an
877    /// error is returned.
878    ///
879    /// If the receive half of the channel is closed, either due to [`close`]
880    /// being called or the [`Receiver`] handle dropping, the function returns
881    /// an error. The error includes the value passed to `send`.
882    ///
883    /// [`send`]: Sender::send
884    /// [`channel`]: channel
885    /// [`close`]: Receiver::close
886    ///
887    /// # Examples
888    ///
889    /// ```
890    /// use tokio::sync::mpsc;
891    ///
892    /// # #[tokio::main(flavor = "current_thread")]
893    /// # async fn main() {
894    /// // Create a channel with buffer size 1
895    /// let (tx1, mut rx) = mpsc::channel(1);
896    /// let tx2 = tx1.clone();
897    ///
898    /// tokio::spawn(async move {
899    ///     tx1.send(1).await.unwrap();
900    ///     tx1.send(2).await.unwrap();
901    ///     // task waits until the receiver receives a value.
902    /// });
903    ///
904    /// tokio::spawn(async move {
905    ///     // This will return an error and send
906    ///     // no message if the buffer is full
907    ///     let _ = tx2.try_send(3);
908    /// });
909    ///
910    /// let mut msg;
911    /// msg = rx.recv().await.unwrap();
912    /// println!("message {} received", msg);
913    ///
914    /// msg = rx.recv().await.unwrap();
915    /// println!("message {} received", msg);
916    ///
917    /// // Third message may have never been sent
918    /// match rx.recv().await {
919    ///     Some(msg) => println!("message {} received", msg),
920    ///     None => println!("the third message was never sent"),
921    /// }
922    /// # }
923    /// ```
924    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
925        match self.chan.semaphore().semaphore.try_acquire(1) {
926            Ok(()) => {}
927            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
928            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
929        }
930
931        // Send the message
932        self.chan.send(message);
933        Ok(())
934    }
935
936    /// Sends a value, waiting until there is capacity, but only for a limited time.
937    ///
938    /// Shares the same success and error conditions as [`send`], adding one more
939    /// condition for an unsuccessful send, which is when the provided timeout has
940    /// elapsed, and there is no capacity available.
941    ///
942    /// [`send`]: Sender::send
943    ///
944    /// # Errors
945    ///
946    /// If the receive half of the channel is closed, either due to [`close`]
947    /// being called or the [`Receiver`] having been dropped,
948    /// the function returns an error. The error includes the value passed to `send`.
949    ///
950    /// [`close`]: Receiver::close
951    /// [`Receiver`]: Receiver
952    ///
953    /// # Panics
954    ///
955    /// This function panics if it is called outside the context of a Tokio
956    /// runtime [with time enabled](crate::runtime::Builder::enable_time).
957    ///
958    /// # Examples
959    ///
960    /// In the following example, each call to `send_timeout` will block until the
961    /// previously sent value was received, unless the timeout has elapsed.
962    ///
963    /// ```rust
964    /// use tokio::sync::mpsc;
965    /// use tokio::time::{sleep, Duration};
966    ///
967    /// # #[tokio::main(flavor = "current_thread")]
968    /// # async fn main() {
969    /// let (tx, mut rx) = mpsc::channel(1);
970    ///
971    /// tokio::spawn(async move {
972    ///     for i in 0..10 {
973    ///         if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
974    ///             println!("send error: #{:?}", e);
975    ///             return;
976    ///         }
977    ///     }
978    /// });
979    ///
980    /// while let Some(i) = rx.recv().await {
981    ///     println!("got = {}", i);
982    ///     sleep(Duration::from_millis(200)).await;
983    /// }
984    /// # }
985    /// ```
986    #[cfg(feature = "time")]
987    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
988    pub async fn send_timeout(
989        &self,
990        value: T,
991        timeout: Duration,
992    ) -> Result<(), SendTimeoutError<T>> {
993        let permit = match crate::time::timeout(timeout, self.reserve()).await {
994            Err(_) => {
995                return Err(SendTimeoutError::Timeout(value));
996            }
997            Ok(Err(_)) => {
998                return Err(SendTimeoutError::Closed(value));
999            }
1000            Ok(Ok(permit)) => permit,
1001        };
1002
1003        permit.send(value);
1004        Ok(())
1005    }
1006
1007    /// Blocking send to call outside of asynchronous contexts.
1008    ///
1009    /// This method is intended for use cases where you are sending from
1010    /// synchronous code to asynchronous code, and will work even if the
1011    /// receiver is not using [`blocking_recv`] to receive the message.
1012    ///
1013    /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
1014    ///
1015    /// # Panics
1016    ///
1017    /// This function panics if called within an asynchronous execution
1018    /// context.
1019    ///
1020    /// # Examples
1021    ///
1022    /// ```
1023    /// # #[cfg(not(target_family = "wasm"))]
1024    /// # {
1025    /// use std::thread;
1026    /// use tokio::runtime::Runtime;
1027    /// use tokio::sync::mpsc;
1028    ///
1029    /// fn main() {
1030    ///     let (tx, mut rx) = mpsc::channel::<u8>(1);
1031    ///
1032    ///     let sync_code = thread::spawn(move || {
1033    ///         tx.blocking_send(10).unwrap();
1034    ///     });
1035    ///
1036    ///     Runtime::new().unwrap().block_on(async move {
1037    ///         assert_eq!(Some(10), rx.recv().await);
1038    ///     });
1039    ///     sync_code.join().unwrap()
1040    /// }
1041    /// # }
1042    /// ```
1043    #[track_caller]
1044    #[cfg(feature = "sync")]
1045    #[cfg_attr(docsrs, doc(alias = "send_blocking"))]
1046    pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
1047        crate::future::block_on(self.send(value))
1048    }
1049
1050    /// Checks if the channel has been closed. This happens when the
1051    /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
1052    /// called.
1053    ///
1054    /// [`Receiver`]: crate::sync::mpsc::Receiver
1055    /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
1056    ///
1057    /// ```
1058    /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
1059    /// assert!(!tx.is_closed());
1060    ///
1061    /// let tx2 = tx.clone();
1062    /// assert!(!tx2.is_closed());
1063    ///
1064    /// drop(rx);
1065    /// assert!(tx.is_closed());
1066    /// assert!(tx2.is_closed());
1067    /// ```
1068    pub fn is_closed(&self) -> bool {
1069        self.chan.is_closed()
1070    }
1071
1072    /// Waits for channel capacity. Once capacity to send one message is
1073    /// available, it is reserved for the caller.
1074    ///
1075    /// If the channel is full, the function waits for the number of unreceived
1076    /// messages to become less than the channel capacity. Capacity to send one
1077    /// message is reserved for the caller. A [`Permit`] is returned to track
1078    /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
1079    /// reserved capacity.
1080    ///
1081    /// Dropping [`Permit`] without sending a message releases the capacity back
1082    /// to the channel.
1083    ///
1084    /// [`Permit`]: Permit
1085    /// [`send`]: Permit::send
1086    ///
1087    /// # Cancel safety
1088    ///
1089    /// This channel uses a queue to ensure that calls to `send` and `reserve`
1090    /// complete in the order they were requested.  Cancelling a call to
1091    /// `reserve` makes you lose your place in the queue.
1092    ///
1093    /// # Examples
1094    ///
1095    /// ```
1096    /// use tokio::sync::mpsc;
1097    ///
1098    /// # #[tokio::main(flavor = "current_thread")]
1099    /// # async fn main() {
1100    /// let (tx, mut rx) = mpsc::channel(1);
1101    ///
1102    /// // Reserve capacity
1103    /// let permit = tx.reserve().await.unwrap();
1104    ///
1105    /// // Trying to send directly on the `tx` will fail due to no
1106    /// // available capacity.
1107    /// assert!(tx.try_send(123).is_err());
1108    ///
1109    /// // Sending on the permit succeeds
1110    /// permit.send(456);
1111    ///
1112    /// // The value sent on the permit is received
1113    /// assert_eq!(rx.recv().await.unwrap(), 456);
1114    /// # }
1115    /// ```
1116    pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
1117        self.reserve_inner(1).await?;
1118        Ok(Permit { chan: &self.chan })
1119    }
1120
1121    /// Waits for channel capacity. Once capacity to send `n` messages is
1122    /// available, it is reserved for the caller.
1123    ///
1124    /// If the channel is full or if there are fewer than `n` permits available, the function waits
1125    /// for the number of unreceived messages to become `n` less than the channel capacity.
1126    /// Capacity to send `n` message is then reserved for the caller.
1127    ///
1128    /// A [`PermitIterator`] is returned to track the reserved capacity.
1129    /// You can call this [`Iterator`] until it is exhausted to
1130    /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1131    /// [`try_reserve_many`] except it awaits for the slots to become available.
1132    ///
1133    /// If the channel is closed, the function returns a [`SendError`].
1134    ///
1135    /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1136    /// permits back to the channel.
1137    ///
1138    /// [`PermitIterator`]: PermitIterator
1139    /// [`Permit`]: Permit
1140    /// [`send`]: Permit::send
1141    /// [`try_reserve_many`]: Sender::try_reserve_many
1142    ///
1143    /// # Cancel safety
1144    ///
1145    /// This channel uses a queue to ensure that calls to `send` and `reserve_many`
1146    /// complete in the order they were requested. Cancelling a call to
1147    /// `reserve_many` makes you lose your place in the queue.
1148    ///
1149    /// # Examples
1150    ///
1151    /// ```
1152    /// use tokio::sync::mpsc;
1153    ///
1154    /// # #[tokio::main(flavor = "current_thread")]
1155    /// # async fn main() {
1156    /// let (tx, mut rx) = mpsc::channel(2);
1157    ///
1158    /// // Reserve capacity
1159    /// let mut permit = tx.reserve_many(2).await.unwrap();
1160    ///
1161    /// // Trying to send directly on the `tx` will fail due to no
1162    /// // available capacity.
1163    /// assert!(tx.try_send(123).is_err());
1164    ///
1165    /// // Sending with the permit iterator succeeds
1166    /// permit.next().unwrap().send(456);
1167    /// permit.next().unwrap().send(457);
1168    ///
1169    /// // The iterator should now be exhausted
1170    /// assert!(permit.next().is_none());
1171    ///
1172    /// // The value sent on the permit is received
1173    /// assert_eq!(rx.recv().await.unwrap(), 456);
1174    /// assert_eq!(rx.recv().await.unwrap(), 457);
1175    /// # }
1176    /// ```
1177    pub async fn reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, SendError<()>> {
1178        self.reserve_inner(n).await?;
1179        Ok(PermitIterator {
1180            chan: &self.chan,
1181            n,
1182        })
1183    }
1184
1185    /// Waits for channel capacity, moving the `Sender` and returning an owned
1186    /// permit. Once capacity to send one message is available, it is reserved
1187    /// for the caller.
1188    ///
1189    /// This moves the sender _by value_, and returns an owned permit that can
1190    /// be used to send a message into the channel. Unlike [`Sender::reserve`],
1191    /// this method may be used in cases where the permit must be valid for the
1192    /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1193    /// essentially a reference count increment, comparable to [`Arc::clone`]),
1194    /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1195    /// moved, it can be cloned prior to calling `reserve_owned`.
1196    ///
1197    /// If the channel is full, the function waits for the number of unreceived
1198    /// messages to become less than the channel capacity. Capacity to send one
1199    /// message is reserved for the caller. An [`OwnedPermit`] is returned to
1200    /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
1201    /// consumes the reserved capacity.
1202    ///
1203    /// Dropping the [`OwnedPermit`] without sending a message releases the
1204    /// capacity back to the channel.
1205    ///
1206    /// # Cancel safety
1207    ///
1208    /// This channel uses a queue to ensure that calls to `send` and `reserve`
1209    /// complete in the order they were requested.  Cancelling a call to
1210    /// `reserve_owned` makes you lose your place in the queue.
1211    ///
1212    /// # Examples
1213    /// Sending a message using an [`OwnedPermit`]:
1214    /// ```
1215    /// use tokio::sync::mpsc;
1216    ///
1217    /// # #[tokio::main(flavor = "current_thread")]
1218    /// # async fn main() {
1219    /// let (tx, mut rx) = mpsc::channel(1);
1220    ///
1221    /// // Reserve capacity, moving the sender.
1222    /// let permit = tx.reserve_owned().await.unwrap();
1223    ///
1224    /// // Send a message, consuming the permit and returning
1225    /// // the moved sender.
1226    /// let tx = permit.send(123);
1227    ///
1228    /// // The value sent on the permit is received.
1229    /// assert_eq!(rx.recv().await.unwrap(), 123);
1230    ///
1231    /// // The sender can now be used again.
1232    /// tx.send(456).await.unwrap();
1233    /// # }
1234    /// ```
1235    ///
1236    /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
1237    /// by value, it can be inexpensively cloned before calling `reserve_owned`:
1238    ///
1239    /// ```
1240    /// use tokio::sync::mpsc;
1241    ///
1242    /// # #[tokio::main(flavor = "current_thread")]
1243    /// # async fn main() {
1244    /// let (tx, mut rx) = mpsc::channel(1);
1245    ///
1246    /// // Clone the sender and reserve capacity.
1247    /// let permit = tx.clone().reserve_owned().await.unwrap();
1248    ///
1249    /// // Trying to send directly on the `tx` will fail due to no
1250    /// // available capacity.
1251    /// assert!(tx.try_send(123).is_err());
1252    ///
1253    /// // Sending on the permit succeeds.
1254    /// permit.send(456);
1255    ///
1256    /// // The value sent on the permit is received
1257    /// assert_eq!(rx.recv().await.unwrap(), 456);
1258    /// # }
1259    /// ```
1260    ///
1261    /// [`Sender::reserve`]: Sender::reserve
1262    /// [`OwnedPermit`]: OwnedPermit
1263    /// [`send`]: OwnedPermit::send
1264    /// [`Arc::clone`]: std::sync::Arc::clone
1265    pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
1266        self.reserve_inner(1).await?;
1267        Ok(OwnedPermit {
1268            chan: Some(self.chan),
1269        })
1270    }
1271
1272    async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> {
1273        crate::trace::async_trace_leaf().await;
1274
1275        if n > self.max_capacity() {
1276            return Err(SendError(()));
1277        }
1278        match self.chan.semaphore().semaphore.acquire(n).await {
1279            Ok(()) => Ok(()),
1280            Err(_) => Err(SendError(())),
1281        }
1282    }
1283
1284    /// Tries to acquire a slot in the channel without waiting for the slot to become
1285    /// available.
1286    ///
1287    /// If the channel is full this function will return [`TrySendError`], otherwise
1288    /// if there is a slot available it will return a [`Permit`] that will then allow you
1289    /// to [`send`] on the channel with a guaranteed slot. This function is similar to
1290    /// [`reserve`] except it does not await for the slot to become available.
1291    ///
1292    /// Dropping [`Permit`] without sending a message releases the capacity back
1293    /// to the channel.
1294    ///
1295    /// [`Permit`]: Permit
1296    /// [`send`]: Permit::send
1297    /// [`reserve`]: Sender::reserve
1298    ///
1299    /// # Examples
1300    ///
1301    /// ```
1302    /// use tokio::sync::mpsc;
1303    ///
1304    /// # #[tokio::main(flavor = "current_thread")]
1305    /// # async fn main() {
1306    /// let (tx, mut rx) = mpsc::channel(1);
1307    ///
1308    /// // Reserve capacity
1309    /// let permit = tx.try_reserve().unwrap();
1310    ///
1311    /// // Trying to send directly on the `tx` will fail due to no
1312    /// // available capacity.
1313    /// assert!(tx.try_send(123).is_err());
1314    ///
1315    /// // Trying to reserve an additional slot on the `tx` will
1316    /// // fail because there is no capacity.
1317    /// assert!(tx.try_reserve().is_err());
1318    ///
1319    /// // Sending on the permit succeeds
1320    /// permit.send(456);
1321    ///
1322    /// // The value sent on the permit is received
1323    /// assert_eq!(rx.recv().await.unwrap(), 456);
1324    ///
1325    /// # }
1326    /// ```
1327    pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
1328        match self.chan.semaphore().semaphore.try_acquire(1) {
1329            Ok(()) => {}
1330            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1331            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1332        }
1333
1334        Ok(Permit { chan: &self.chan })
1335    }
1336
1337    /// Tries to acquire `n` slots in the channel without waiting for the slot to become
1338    /// available.
1339    ///
1340    /// A [`PermitIterator`] is returned to track the reserved capacity.
1341    /// You can call this [`Iterator`] until it is exhausted to
1342    /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1343    /// [`reserve_many`] except it does not await for the slots to become available.
1344    ///
1345    /// If there are fewer than `n` permits available on the channel, then
1346    /// this function will return a [`TrySendError::Full`]. If the channel is closed
1347    /// this function will return a [`TrySendError::Closed`].
1348    ///
1349    /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1350    /// permits back to the channel.
1351    ///
1352    /// [`PermitIterator`]: PermitIterator
1353    /// [`send`]: Permit::send
1354    /// [`reserve_many`]: Sender::reserve_many
1355    ///
1356    /// # Examples
1357    ///
1358    /// ```
1359    /// use tokio::sync::mpsc;
1360    ///
1361    /// # #[tokio::main(flavor = "current_thread")]
1362    /// # async fn main() {
1363    /// let (tx, mut rx) = mpsc::channel(2);
1364    ///
1365    /// // Reserve capacity
1366    /// let mut permit = tx.try_reserve_many(2).unwrap();
1367    ///
1368    /// // Trying to send directly on the `tx` will fail due to no
1369    /// // available capacity.
1370    /// assert!(tx.try_send(123).is_err());
1371    ///
1372    /// // Trying to reserve an additional slot on the `tx` will
1373    /// // fail because there is no capacity.
1374    /// assert!(tx.try_reserve().is_err());
1375    ///
1376    /// // Sending with the permit iterator succeeds
1377    /// permit.next().unwrap().send(456);
1378    /// permit.next().unwrap().send(457);
1379    ///
1380    /// // The iterator should now be exhausted
1381    /// assert!(permit.next().is_none());
1382    ///
1383    /// // The value sent on the permit is received
1384    /// assert_eq!(rx.recv().await.unwrap(), 456);
1385    /// assert_eq!(rx.recv().await.unwrap(), 457);
1386    ///
1387    /// // Trying to call try_reserve_many with 0 will return an empty iterator
1388    /// let mut permit = tx.try_reserve_many(0).unwrap();
1389    /// assert!(permit.next().is_none());
1390    ///
1391    /// // Trying to call try_reserve_many with a number greater than the channel
1392    /// // capacity will return an error
1393    /// let permit = tx.try_reserve_many(3);
1394    /// assert!(permit.is_err());
1395    ///
1396    /// // Trying to call try_reserve_many on a closed channel will return an error
1397    /// drop(rx);
1398    /// let permit = tx.try_reserve_many(1);
1399    /// assert!(permit.is_err());
1400    ///
1401    /// let permit = tx.try_reserve_many(0);
1402    /// assert!(permit.is_err());
1403    /// # }
1404    /// ```
1405    pub fn try_reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, TrySendError<()>> {
1406        if n > self.max_capacity() {
1407            return Err(TrySendError::Full(()));
1408        }
1409
1410        match self.chan.semaphore().semaphore.try_acquire(n) {
1411            Ok(()) => {}
1412            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1413            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1414        }
1415
1416        Ok(PermitIterator {
1417            chan: &self.chan,
1418            n,
1419        })
1420    }
1421
1422    /// Tries to acquire a slot in the channel without waiting for the slot to become
1423    /// available, returning an owned permit.
1424    ///
1425    /// This moves the sender _by value_, and returns an owned permit that can
1426    /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
1427    /// this method may be used in cases where the permit must be valid for the
1428    /// `'static` lifetime.  `Sender`s may be cloned cheaply (`Sender::clone` is
1429    /// essentially a reference count increment, comparable to [`Arc::clone`]),
1430    /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1431    /// moved, it can be cloned prior to calling `try_reserve_owned`.
1432    ///
1433    /// If the channel is full this function will return a [`TrySendError`].
1434    /// Since the sender is taken by value, the `TrySendError` returned in this
1435    /// case contains the sender, so that it may be used again. Otherwise, if
1436    /// there is a slot available, this method will return an [`OwnedPermit`]
1437    /// that can then be used to [`send`] on the channel with a guaranteed slot.
1438    /// This function is similar to  [`reserve_owned`] except it does not await
1439    /// for the slot to become available.
1440    ///
1441    /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
1442    /// to the channel.
1443    ///
1444    /// [`OwnedPermit`]: OwnedPermit
1445    /// [`send`]: OwnedPermit::send
1446    /// [`reserve_owned`]: Sender::reserve_owned
1447    /// [`Arc::clone`]: std::sync::Arc::clone
1448    ///
1449    /// # Examples
1450    ///
1451    /// ```
1452    /// use tokio::sync::mpsc;
1453    ///
1454    /// # #[tokio::main(flavor = "current_thread")]
1455    /// # async fn main() {
1456    /// let (tx, mut rx) = mpsc::channel(1);
1457    ///
1458    /// // Reserve capacity
1459    /// let permit = tx.clone().try_reserve_owned().unwrap();
1460    ///
1461    /// // Trying to send directly on the `tx` will fail due to no
1462    /// // available capacity.
1463    /// assert!(tx.try_send(123).is_err());
1464    ///
1465    /// // Trying to reserve an additional slot on the `tx` will
1466    /// // fail because there is no capacity.
1467    /// assert!(tx.try_reserve().is_err());
1468    ///
1469    /// // Sending on the permit succeeds
1470    /// permit.send(456);
1471    ///
1472    /// // The value sent on the permit is received
1473    /// assert_eq!(rx.recv().await.unwrap(), 456);
1474    ///
1475    /// # }
1476    /// ```
1477    pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
1478        match self.chan.semaphore().semaphore.try_acquire(1) {
1479            Ok(()) => {}
1480            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
1481            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
1482        }
1483
1484        Ok(OwnedPermit {
1485            chan: Some(self.chan),
1486        })
1487    }
1488
1489    /// Returns `true` if senders belong to the same channel.
1490    ///
1491    /// # Examples
1492    ///
1493    /// ```
1494    /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1495    /// let  tx2 = tx.clone();
1496    /// assert!(tx.same_channel(&tx2));
1497    ///
1498    /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
1499    /// assert!(!tx3.same_channel(&tx2));
1500    /// ```
1501    pub fn same_channel(&self, other: &Self) -> bool {
1502        self.chan.same_channel(&other.chan)
1503    }
1504
1505    /// Returns the current capacity of the channel.
1506    ///
1507    /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
1508    /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
1509    /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
1510    /// specified when calling [`channel`]
1511    ///
1512    /// # Examples
1513    ///
1514    /// ```
1515    /// use tokio::sync::mpsc;
1516    ///
1517    /// # #[tokio::main(flavor = "current_thread")]
1518    /// # async fn main() {
1519    /// let (tx, mut rx) = mpsc::channel::<()>(5);
1520    ///
1521    /// assert_eq!(tx.capacity(), 5);
1522    ///
1523    /// // Making a reservation drops the capacity by one.
1524    /// let permit = tx.reserve().await.unwrap();
1525    /// assert_eq!(tx.capacity(), 4);
1526    ///
1527    /// // Sending and receiving a value increases the capacity by one.
1528    /// permit.send(());
1529    /// rx.recv().await.unwrap();
1530    /// assert_eq!(tx.capacity(), 5);
1531    /// # }
1532    /// ```
1533    ///
1534    /// [`send`]: Sender::send
1535    /// [`reserve`]: Sender::reserve
1536    /// [`channel`]: channel
1537    /// [`max_capacity`]: Sender::max_capacity
1538    pub fn capacity(&self) -> usize {
1539        self.chan.semaphore().semaphore.available_permits()
1540    }
1541
1542    /// Converts the `Sender` to a [`WeakSender`] that does not count
1543    /// towards RAII semantics, i.e. if all `Sender` instances of the
1544    /// channel were dropped and only `WeakSender` instances remain,
1545    /// the channel is closed.
1546    #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
1547    pub fn downgrade(&self) -> WeakSender<T> {
1548        WeakSender {
1549            chan: self.chan.downgrade(),
1550        }
1551    }
1552
1553    /// Returns the maximum buffer capacity of the channel.
1554    ///
1555    /// The maximum capacity is the buffer capacity initially specified when calling
1556    /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
1557    /// available buffer capacity: as messages are sent and received, the
1558    /// value returned by [`capacity`] will go up or down, whereas the value
1559    /// returned by [`max_capacity`] will remain constant.
1560    ///
1561    /// # Examples
1562    ///
1563    /// ```
1564    /// use tokio::sync::mpsc;
1565    ///
1566    /// # #[tokio::main(flavor = "current_thread")]
1567    /// # async fn main() {
1568    /// let (tx, _rx) = mpsc::channel::<()>(5);
1569    ///
1570    /// // both max capacity and capacity are the same at first
1571    /// assert_eq!(tx.max_capacity(), 5);
1572    /// assert_eq!(tx.capacity(), 5);
1573    ///
1574    /// // Making a reservation doesn't change the max capacity.
1575    /// let permit = tx.reserve().await.unwrap();
1576    /// assert_eq!(tx.max_capacity(), 5);
1577    /// // but drops the capacity by one
1578    /// assert_eq!(tx.capacity(), 4);
1579    /// # }
1580    /// ```
1581    ///
1582    /// [`channel`]: channel
1583    /// [`max_capacity`]: Sender::max_capacity
1584    /// [`capacity`]: Sender::capacity
1585    pub fn max_capacity(&self) -> usize {
1586        self.chan.semaphore().bound
1587    }
1588
1589    /// Returns the number of [`Sender`] handles.
1590    pub fn strong_count(&self) -> usize {
1591        self.chan.strong_count()
1592    }
1593
1594    /// Returns the number of [`WeakSender`] handles.
1595    pub fn weak_count(&self) -> usize {
1596        self.chan.weak_count()
1597    }
1598}
1599
1600impl<T> Clone for Sender<T> {
1601    fn clone(&self) -> Self {
1602        Sender {
1603            chan: self.chan.clone(),
1604        }
1605    }
1606}
1607
1608impl<T> fmt::Debug for Sender<T> {
1609    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1610        fmt.debug_struct("Sender")
1611            .field("chan", &self.chan)
1612            .finish()
1613    }
1614}
1615
1616impl<T> Clone for WeakSender<T> {
1617    fn clone(&self) -> Self {
1618        self.chan.increment_weak_count();
1619
1620        WeakSender {
1621            chan: self.chan.clone(),
1622        }
1623    }
1624}
1625
1626impl<T> Drop for WeakSender<T> {
1627    fn drop(&mut self) {
1628        self.chan.decrement_weak_count();
1629    }
1630}
1631
1632impl<T> WeakSender<T> {
1633    /// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some`
1634    /// if there are other `Sender` instances alive and the channel wasn't
1635    /// previously dropped, otherwise `None` is returned.
1636    pub fn upgrade(&self) -> Option<Sender<T>> {
1637        chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
1638    }
1639
1640    /// Returns the number of [`Sender`] handles.
1641    pub fn strong_count(&self) -> usize {
1642        self.chan.strong_count()
1643    }
1644
1645    /// Returns the number of [`WeakSender`] handles.
1646    pub fn weak_count(&self) -> usize {
1647        self.chan.weak_count()
1648    }
1649}
1650
1651impl<T> fmt::Debug for WeakSender<T> {
1652    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1653        fmt.debug_struct("WeakSender").finish()
1654    }
1655}
1656
1657// ===== impl Permit =====
1658
1659impl<T> Permit<'_, T> {
1660    /// Sends a value using the reserved capacity.
1661    ///
1662    /// Capacity for the message has already been reserved. The message is sent
1663    /// to the receiver and the permit is consumed. The operation will succeed
1664    /// even if the receiver half has been closed. See [`Receiver::close`] for
1665    /// more details on performing a clean shutdown.
1666    ///
1667    /// [`Receiver::close`]: Receiver::close
1668    ///
1669    /// # Examples
1670    ///
1671    /// ```
1672    /// use tokio::sync::mpsc;
1673    ///
1674    /// # #[tokio::main(flavor = "current_thread")]
1675    /// # async fn main() {
1676    /// let (tx, mut rx) = mpsc::channel(1);
1677    ///
1678    /// // Reserve capacity
1679    /// let permit = tx.reserve().await.unwrap();
1680    ///
1681    /// // Trying to send directly on the `tx` will fail due to no
1682    /// // available capacity.
1683    /// assert!(tx.try_send(123).is_err());
1684    ///
1685    /// // Send a message on the permit
1686    /// permit.send(456);
1687    ///
1688    /// // The value sent on the permit is received
1689    /// assert_eq!(rx.recv().await.unwrap(), 456);
1690    /// # }
1691    /// ```
1692    pub fn send(self, value: T) {
1693        use std::mem;
1694
1695        self.chan.send(value);
1696
1697        // Avoid the drop logic
1698        mem::forget(self);
1699    }
1700}
1701
1702impl<T> Drop for Permit<'_, T> {
1703    fn drop(&mut self) {
1704        use chan::Semaphore;
1705
1706        let semaphore = self.chan.semaphore();
1707
1708        // Add the permit back to the semaphore
1709        semaphore.add_permit();
1710
1711        // If this is the last sender for this channel, wake the receiver so
1712        // that it can be notified that the channel is closed.
1713        if semaphore.is_closed() && semaphore.is_idle() {
1714            self.chan.wake_rx();
1715        }
1716    }
1717}
1718
1719impl<T> fmt::Debug for Permit<'_, T> {
1720    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1721        fmt.debug_struct("Permit")
1722            .field("chan", &self.chan)
1723            .finish()
1724    }
1725}
1726
1727// ===== impl PermitIterator =====
1728
1729impl<'a, T> Iterator for PermitIterator<'a, T> {
1730    type Item = Permit<'a, T>;
1731
1732    fn next(&mut self) -> Option<Self::Item> {
1733        if self.n == 0 {
1734            return None;
1735        }
1736
1737        self.n -= 1;
1738        Some(Permit { chan: self.chan })
1739    }
1740
1741    fn size_hint(&self) -> (usize, Option<usize>) {
1742        let n = self.n;
1743        (n, Some(n))
1744    }
1745}
1746impl<T> ExactSizeIterator for PermitIterator<'_, T> {}
1747impl<T> std::iter::FusedIterator for PermitIterator<'_, T> {}
1748
1749impl<T> Drop for PermitIterator<'_, T> {
1750    fn drop(&mut self) {
1751        use chan::Semaphore;
1752
1753        if self.n == 0 {
1754            return;
1755        }
1756
1757        let semaphore = self.chan.semaphore();
1758
1759        // Add the remaining permits back to the semaphore
1760        semaphore.add_permits(self.n);
1761
1762        // If this is the last sender for this channel, wake the receiver so
1763        // that it can be notified that the channel is closed.
1764        if semaphore.is_closed() && semaphore.is_idle() {
1765            self.chan.wake_rx();
1766        }
1767    }
1768}
1769
1770impl<T> fmt::Debug for PermitIterator<'_, T> {
1771    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1772        fmt.debug_struct("PermitIterator")
1773            .field("chan", &self.chan)
1774            .field("capacity", &self.n)
1775            .finish()
1776    }
1777}
1778
1779// ===== impl Permit =====
1780
1781impl<T> OwnedPermit<T> {
1782    /// Sends a value using the reserved capacity.
1783    ///
1784    /// Capacity for the message has already been reserved. The message is sent
1785    /// to the receiver and the permit is consumed. The operation will succeed
1786    /// even if the receiver half has been closed. See [`Receiver::close`] for
1787    /// more details on performing a clean shutdown.
1788    ///
1789    /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1790    /// the `OwnedPermit` was reserved.
1791    ///
1792    /// [`Receiver::close`]: Receiver::close
1793    ///
1794    /// # Examples
1795    ///
1796    /// ```
1797    /// use tokio::sync::mpsc;
1798    ///
1799    /// # #[tokio::main(flavor = "current_thread")]
1800    /// # async fn main() {
1801    /// let (tx, mut rx) = mpsc::channel(1);
1802    ///
1803    /// // Reserve capacity
1804    /// let permit = tx.reserve_owned().await.unwrap();
1805    ///
1806    /// // Send a message on the permit, returning the sender.
1807    /// let tx = permit.send(456);
1808    ///
1809    /// // The value sent on the permit is received
1810    /// assert_eq!(rx.recv().await.unwrap(), 456);
1811    ///
1812    /// // We may now reuse `tx` to send another message.
1813    /// tx.send(789).await.unwrap();
1814    /// # }
1815    /// ```
1816    pub fn send(mut self, value: T) -> Sender<T> {
1817        let chan = self.chan.take().unwrap_or_else(|| {
1818            unreachable!("OwnedPermit channel is only taken when the permit is moved")
1819        });
1820        chan.send(value);
1821
1822        Sender { chan }
1823    }
1824
1825    /// Releases the reserved capacity *without* sending a message, returning the
1826    /// [`Sender`].
1827    ///
1828    /// # Examples
1829    ///
1830    /// ```
1831    /// use tokio::sync::mpsc;
1832    ///
1833    /// # #[tokio::main(flavor = "current_thread")]
1834    /// # async fn main() {
1835    /// let (tx, rx) = mpsc::channel(1);
1836    ///
1837    /// // Clone the sender and reserve capacity
1838    /// let permit = tx.clone().reserve_owned().await.unwrap();
1839    ///
1840    /// // Trying to send on the original `tx` will fail, since the `permit`
1841    /// // has reserved all the available capacity.
1842    /// assert!(tx.try_send(123).is_err());
1843    ///
1844    /// // Release the permit without sending a message, returning the clone
1845    /// // of the sender.
1846    /// let tx2 = permit.release();
1847    ///
1848    /// // We may now reuse `tx` to send another message.
1849    /// tx.send(789).await.unwrap();
1850    /// # drop(rx); drop(tx2);
1851    /// # }
1852    /// ```
1853    ///
1854    /// [`Sender`]: Sender
1855    pub fn release(mut self) -> Sender<T> {
1856        use chan::Semaphore;
1857
1858        let chan = self.chan.take().unwrap_or_else(|| {
1859            unreachable!("OwnedPermit channel is only taken when the permit is moved")
1860        });
1861
1862        // Add the permit back to the semaphore
1863        chan.semaphore().add_permit();
1864        Sender { chan }
1865    }
1866
1867    /// Returns `true` if permits belong to the same channel.
1868    ///
1869    /// # Examples
1870    ///
1871    /// ```
1872    /// use tokio::sync::mpsc;
1873    ///
1874    /// # #[tokio::main(flavor = "current_thread")]
1875    /// # async fn main() {
1876    /// let (tx, rx) = mpsc::channel::<()>(2);
1877    ///
1878    /// let permit1 = tx.clone().reserve_owned().await.unwrap();
1879    /// let permit2 = tx.clone().reserve_owned().await.unwrap();
1880    /// assert!(permit1.same_channel(&permit2));
1881    ///
1882    /// let (tx2, rx2) = mpsc::channel::<()>(1);
1883    ///
1884    /// let permit3 = tx2.clone().reserve_owned().await.unwrap();
1885    /// assert!(!permit3.same_channel(&permit2));
1886    /// # }
1887    /// ```
1888    pub fn same_channel(&self, other: &Self) -> bool {
1889        self.chan
1890            .as_ref()
1891            .zip(other.chan.as_ref())
1892            .is_some_and(|(a, b)| a.same_channel(b))
1893    }
1894
1895    /// Returns `true` if this permit belongs to the same channel as the given [`Sender`].
1896    ///
1897    /// # Examples
1898    ///
1899    /// ```
1900    /// use tokio::sync::mpsc;
1901    ///
1902    /// # #[tokio::main(flavor = "current_thread")]
1903    /// # async fn main() {
1904    /// let (tx, rx) = mpsc::channel::<()>(1);
1905    ///
1906    /// let permit = tx.clone().reserve_owned().await.unwrap();
1907    /// assert!(permit.same_channel_as_sender(&tx));
1908    ///
1909    /// let (tx2, rx2) = mpsc::channel::<()>(1);
1910    /// assert!(!permit.same_channel_as_sender(&tx2));
1911    /// # }
1912    /// ```
1913    pub fn same_channel_as_sender(&self, sender: &Sender<T>) -> bool {
1914        self.chan
1915            .as_ref()
1916            .is_some_and(|chan| chan.same_channel(&sender.chan))
1917    }
1918}
1919
1920impl<T> Drop for OwnedPermit<T> {
1921    fn drop(&mut self) {
1922        use chan::Semaphore;
1923
1924        // Are we still holding onto the sender?
1925        if let Some(chan) = self.chan.take() {
1926            let semaphore = chan.semaphore();
1927
1928            // Add the permit back to the semaphore
1929            semaphore.add_permit();
1930
1931            // If this `OwnedPermit` is holding the last sender for this
1932            // channel, wake the receiver so that it can be notified that the
1933            // channel is closed.
1934            if semaphore.is_closed() && semaphore.is_idle() {
1935                chan.wake_rx();
1936            }
1937        }
1938
1939        // Otherwise, do nothing.
1940    }
1941}
1942
1943impl<T> fmt::Debug for OwnedPermit<T> {
1944    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1945        fmt.debug_struct("OwnedPermit")
1946            .field("chan", &self.chan)
1947            .finish()
1948    }
1949}